#!/usr/bin/env python3
import os
import sys
from math import nan, sqrt
from random import randrange

import numpy as np
import pandas as pd
from scipy import stats

CURDIR = os.path.dirname(os.path.realpath(__file__))
sys.path.insert(0, os.path.join(CURDIR, "helpers"))

from pure_http_client import ClickHouseClient


# unpooled variance z-test for proportions of two samples
def twosample_proportion_ztest(s1, s2, t1, t2, alpha):
    if s1 == 0 or s2 == 0 or s1 > t1 or s2 > t2 or t1 + t2 == 0:
        return nan, nan, nan, nan

    p1 = s1 / t1
    p2 = s2 / t2
    se = sqrt(p1 * (1 - p1) / t1 + p2 * (1 - p2) / t2)
    if se == 0:
        return nan, nan, nan, nan
    z_stat = (p1 - p2) / se

    one_side = 1 - stats.norm.cdf(abs(z_stat))
    p_value = one_side * 2

    z = stats.norm.ppf(1 - 0.5 * alpha)
    ci_lower = (p1 - p2) - z * se
    ci_upper = (p1 - p2) + z * se

    return z_stat, p_value, ci_lower, ci_upper


def test_and_check(name, z_stat, p_value, ci_lower, ci_upper, precision=1e-2):
    client = ClickHouseClient()
    real = client.query_return_df(
        "SELECT roundBankers({}.1, 16) as z_stat, ".format(name)
        + "roundBankers({}.2, 16) as p_value, ".format(name)
        + "roundBankers({}.3, 16) as ci_lower, ".format(name)
        + "roundBankers({}.4, 16) as ci_upper ".format(name)
        + "FORMAT TabSeparatedWithNames;"
    )
    real_z_stat = real["z_stat"][0]
    real_p_value = real["p_value"][0]
    real_ci_lower = real["ci_lower"][0]
    real_ci_upper = real["ci_upper"][0]
    assert (np.isnan(real_z_stat) and np.isnan(z_stat)) or abs(
        real_z_stat - np.float64(z_stat)
    ) < precision, "clickhouse_z_stat {}, py_z_stat {}".format(real_z_stat, z_stat)
    assert (np.isnan(real_p_value) and np.isnan(p_value)) or abs(
        real_p_value - np.float64(p_value)
    ) < precision, "clickhouse_p_value {}, py_p_value {}".format(real_p_value, p_value)
    assert (np.isnan(real_ci_lower) and np.isnan(ci_lower)) or abs(
        real_ci_lower - np.float64(ci_lower)
    ) < precision, "clickhouse_ci_lower {}, py_ci_lower {}".format(
        real_ci_lower, ci_lower
    )
    assert (np.isnan(real_ci_upper) and np.isnan(ci_upper)) or abs(
        real_ci_upper - np.float64(ci_upper)
    ) < precision, "clickhouse_ci_upper {}, py_ci_upper {}".format(
        real_ci_upper, ci_upper
    )


def test_mean_ztest():
    counts = [0, 0]
    nobs = [0, 0]
    z_stat, p_value, ci_lower, ci_upper = twosample_proportion_ztest(
        counts[0], counts[1], nobs[0], nobs[1], 0.05
    )
    test_and_check(
        "proportionsZTest(%d, %d, %d, %d, 0.95, 'unpooled')"
        % (counts[0], counts[1], nobs[0], nobs[1]),
        z_stat,
        p_value,
        ci_lower,
        ci_upper,
    )
    z_stat, p_value, ci_lower, ci_upper = twosample_proportion_ztest(
        10, 10, 10, 10, 0.05
    )

    counts = [10, 10]
    nobs = [10, 10]
    z_stat, p_value, ci_lower, ci_upper = twosample_proportion_ztest(
        counts[0], counts[1], nobs[0], nobs[1], 0.05
    )
    test_and_check(
        "proportionsZTest(%d, %d, %d, %d, 0.95, 'unpooled')"
        % (counts[0], counts[1], nobs[0], nobs[1]),
        z_stat,
        p_value,
        ci_lower,
        ci_upper,
    )
    z_stat, p_value, ci_lower, ci_upper = twosample_proportion_ztest(
        10, 10, 10, 10, 0.05
    )

    counts = [16, 16]
    nobs = [16, 18]
    z_stat, p_value, ci_lower, ci_upper = twosample_proportion_ztest(
        counts[0], counts[1], nobs[0], nobs[1], 0.05
    )
    test_and_check(
        "proportionsZTest(%d, %d, %d, %d, 0.95, 'unpooled')"
        % (counts[0], counts[1], nobs[0], nobs[1]),
        z_stat,
        p_value,
        ci_lower,
        ci_upper,
    )

    counts = [10, 20]
    nobs = [30, 40]
    z_stat, p_value, ci_lower, ci_upper = twosample_proportion_ztest(
        counts[0], counts[1], nobs[0], nobs[1], 0.05
    )
    test_and_check(
        "proportionsZTest(%d, %d, %d, %d, 0.95, 'unpooled')"
        % (counts[0], counts[1], nobs[0], nobs[1]),
        z_stat,
        p_value,
        ci_lower,
        ci_upper,
    )

    counts = [20, 10]
    nobs = [40, 30]
    z_stat, p_value, ci_lower, ci_upper = twosample_proportion_ztest(
        counts[0], counts[1], nobs[0], nobs[1], 0.05
    )
    test_and_check(
        "proportionsZTest(%d, %d, %d, %d, 0.95, 'unpooled')"
        % (counts[0], counts[1], nobs[0], nobs[1]),
        z_stat,
        p_value,
        ci_lower,
        ci_upper,
    )

    counts = [randrange(10, 20), randrange(10, 20)]
    nobs = [
        randrange(counts[0] + 1, counts[0] * 2),
        randrange(counts[1], counts[1] * 2),
    ]
    z_stat, p_value, ci_lower, ci_upper = twosample_proportion_ztest(
        counts[0], counts[1], nobs[0], nobs[1], 0.05
    )
    test_and_check(
        "proportionsZTest(%d, %d, %d, %d, 0.95, 'unpooled')"
        % (counts[0], counts[1], nobs[0], nobs[1]),
        z_stat,
        p_value,
        ci_lower,
        ci_upper,
    )

    counts = [randrange(1, 100), randrange(1, 200)]
    nobs = [randrange(counts[0], counts[0] * 2), randrange(counts[1], counts[1] * 3)]
    z_stat, p_value, ci_lower, ci_upper = twosample_proportion_ztest(
        counts[0], counts[1], nobs[0], nobs[1], 0.05
    )
    test_and_check(
        "proportionsZTest(%d, %d, %d, %d, 0.95, 'unpooled')"
        % (counts[0], counts[1], nobs[0], nobs[1]),
        z_stat,
        p_value,
        ci_lower,
        ci_upper,
    )

    counts = [randrange(1, 200), randrange(1, 100)]
    nobs = [randrange(counts[0], counts[0] * 3), randrange(counts[1], counts[1] * 2)]
    z_stat, p_value, ci_lower, ci_upper = twosample_proportion_ztest(
        counts[0], counts[1], nobs[0], nobs[1], 0.05
    )
    test_and_check(
        "proportionsZTest(%d, %d, %d, %d, 0.95, 'unpooled')"
        % (counts[0], counts[1], nobs[0], nobs[1]),
        z_stat,
        p_value,
        ci_lower,
        ci_upper,
    )

    counts = [randrange(1, 1000), randrange(1, 1000)]
    nobs = [randrange(counts[0], counts[0] * 2), randrange(counts[1], counts[1] * 2)]
    z_stat, p_value, ci_lower, ci_upper = twosample_proportion_ztest(
        counts[0], counts[1], nobs[0], nobs[1], 0.05
    )
    test_and_check(
        "proportionsZTest(%d, %d, %d, %d, 0.95, 'unpooled')"
        % (counts[0], counts[1], nobs[0], nobs[1]),
        z_stat,
        p_value,
        ci_lower,
        ci_upper,
    )


if __name__ == "__main__":
    test_mean_ztest()
    print("Ok.")
